package cn.doitedu.rtdw.data_sync;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/4/10
 * @Desc: 学大数据，到多易教育
 *  广告创意信息表  和  广告信息表 ，同步到 hbase
 **/
public class SyncJob03_AdInfoTable2Hbase {
    public static void main(String[] args) {

        // 创建编程入口环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建cdc映射表，读取 业务表中的广告信息表数据
        tEnv.executeSql("CREATE TABLE adinfo_source (    " +
                "      adid STRING,                                 " +
                "      name STRING,                                 " +
                "      ad_farther STRING,                           " +
                "      campain STRING,                              " +
                "     PRIMARY KEY (adid) NOT ENFORCED               " +
                "     ) WITH (                                 " +
                "     'connector' = 'mysql-cdc',               " +
                "     'hostname' = 'doitedu'   ,               " +
                "     'port' = '3306'          ,               " +
                "     'username' = 'root'      ,               " +
                "     'password' = 'root'      ,               " +
                "     'database-name' = 'realtimedw',          " +
                "     'table-name' = 'dim_adinfo'              " +
                ")");

        // 创建cdc映射表，读取 业务表中的广告创意信息表数据
        tEnv.executeSql("CREATE TABLE creative_source (    " +
                "      creative_id STRING,                     " +
                "      creative_name STRING,                   " +
                "      ad_id STRING,                           " +
                "     PRIMARY KEY (creative_id) NOT ENFORCED   " +
                "     ) WITH (                                 " +
                "     'connector' = 'mysql-cdc',               " +
                "     'hostname' = 'doitedu'   ,               " +
                "     'port' = '3306'          ,               " +
                "     'username' = 'root'      ,               " +
                "     'password' = 'root'      ,               " +
                "     'database-name' = 'realtimedw',          " +
                "     'table-name' = 'dim_ad_creative'         " +
                ")");

        // 创建hbase映射表
        tEnv.executeSql(
                "CREATE TABLE dim_ad_hbasesink( " +
                        " creative_id STRING, " +
                        " f ROW<creative_name STRING,ad_id STRING,ad_name STRING,ad_farther STRING,ad_campain STRING>, " +
                        " PRIMARY KEY (creative_id) NOT ENFORCED " +
                        ") WITH (                             " +
                        " 'connector' = 'hbase-2.2',          " +
                        " 'table-name' = 'dim_ad_info',     " +
                        " 'zookeeper.quorum' = 'doitedu:2181' " +
                        ")");



        // 两表join,并将结果插入hbase表
        tEnv.executeSql(
                " INSERT INTO dim_ad_hbasesink                                    "
                        +" WITH tmp as (                                                   "
                        +" SELECT                                                          "
                        +"   b.creative_id,                                                "
                        +"   b.creative_name,                                              "
                        +"   b.ad_id,                                                      "
                        +"   a.name as ad_name,                                            "
                        +"   a.ad_farther,                                                 "
                        +"   a.campain as ad_campain                                       "
                        +" FROM adinfo_source a join creative_source b                     "
                        +" ON a.adid=b.ad_id  )                                            "
                        +"                                                                 "
                        +" SELECT                                                          "
                        +"   creative_id,                                                  "
                        +"   ROW(creative_name,ad_id,ad_name,ad_farther,ad_campain) as f   "
                        +" FROM tmp                                                        "
        );


    }
}
